package org.pentaho.obscommon;

import com.obs.services.model.*;
import org.pentaho.di.core.logging.LogChannel;
import org.pentaho.di.core.logging.LogChannelInterface;
import org.pentaho.di.i18n.BaseMessages;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.*;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;

public class OBSCommonPipedOutputStream extends PipedOutputStream {

    private static final Class<?> PKG = OBSCommonPipedOutputStream.class;
    private static final Logger logger = LoggerFactory.getLogger( OBSCommonPipedOutputStream.class );
    private static final LogChannelInterface consoleLog = new LogChannel( BaseMessages.getString( PKG, "TITLE.OBSFile" ) );

    private static int PART_SIZE = 5242880;
    private ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(1);
    private boolean initialized = false;
    private boolean blockedUntilDone = true;
    private PipedInputStream pipedInputStream = new PipedInputStream();
    private OBSAsyncTransferRunner obsAsyncTransferRunner;
    private OBSCommonFileSystem fileSystem;

    private Future<Boolean> result = null;
    private String bucketId;
    private String key;
    private Boolean bAppend;
    private String empty;


    public OBSCommonPipedOutputStream(OBSCommonFileSystem fileSystem, String bucketId, String key, boolean bAppend, String empty) {
        try {
            this.pipedInputStream.connect(this);
        } catch (IOException var5) {
            throw new RuntimeException(var5);
        }
        this.obsAsyncTransferRunner = new OBSAsyncTransferRunner();
        this.bucketId = bucketId;
        this.key = key;
        this.fileSystem = fileSystem;
        this.bAppend = bAppend;
        this.empty = empty;
    }

    private void initializeWrite() {
        if (!this.initialized) {
            this.initialized = true;
            this.result = this.executor.submit(this.obsAsyncTransferRunner);
        }
    }

    public boolean isBlockedUntilDone() {
        return this.blockedUntilDone;
    }

    public void setBlockedUntilDone(boolean blockedUntilDone) {
        this.blockedUntilDone = blockedUntilDone;
    }

    public void write(int b) throws IOException {
        this.initializeWrite();
        super.write(b);
    }

    // Params:
    // b – the data.
    // off – the start offset in the data.
    // len – the number of bytes to write.
    public void write(byte[] b, int off, int len) throws IOException {
        this.initializeWrite();
        super.write(b, off, len);
    }

    public void close() throws IOException {
        super.close();
        if (this.initialized && this.isBlockedUntilDone()) {
            while(!this.result.isDone()) {
                try {
                    Thread.sleep(100L);
                } catch (InterruptedException var2) {
                    var2.printStackTrace();
                    Thread.currentThread().interrupt();
                }
            }
        }
        this.executor.shutdown();
    }

    class OBSAsyncTransferRunner implements Callable<Boolean> {
        OBSAsyncTransferRunner() {
        }

        public Boolean call() throws Exception {
            boolean returnVal = true;
            List<PartEtag> partETags = new ArrayList();
            InitiateMultipartUploadRequest initRequest = new InitiateMultipartUploadRequest(OBSCommonPipedOutputStream.this.bucketId, OBSCommonPipedOutputStream.this.key);
            InitiateMultipartUploadResult initResponse = null;

            try {

                ByteArrayOutputStream baos = new ByteArrayOutputStream(OBSCommonPipedOutputStream.PART_SIZE);
                Throwable var8 = null;

                try {
                    BufferedInputStream bis = new BufferedInputStream(OBSCommonPipedOutputStream.this.pipedInputStream, OBSCommonPipedOutputStream.PART_SIZE);
                    Throwable var10 = null;

                    try {
                        initResponse = OBSCommonPipedOutputStream.this.fileSystem.getOBSClient().initiateMultipartUpload(initRequest);
                        byte[] tmpBuffer = new byte[OBSCommonPipedOutputStream.PART_SIZE];
                        long offset = 0L;
                        int totalRead = 0;
                        int partNum = 1;

                        int read;
                        OBSCommonWindowedSubstream obsis;
                        UploadPartRequest uploadRequest;
                        logger.info( BaseMessages.getString( PKG, "INFO.OBSMultiPart.Start" ) );
                        while((read = bis.read(tmpBuffer)) >= 0) {

                            if (bAppend) {
                                baos.write(empty.getBytes(StandardCharsets.UTF_8));
                                totalRead += empty.length();
                            }

                            if (read > 0) {
                                baos.write(tmpBuffer, 0, read);
                                totalRead += read;
                            }

                            if (totalRead > OBSCommonPipedOutputStream.PART_SIZE) {
                                obsis = new OBSCommonWindowedSubstream(baos.toByteArray());
                                uploadRequest = new UploadPartRequest(OBSCommonPipedOutputStream.this.bucketId, OBSCommonPipedOutputStream.this.key, (long)totalRead, obsis);
                                uploadRequest.setUploadId(initResponse.getUploadId());
                                uploadRequest.setPartNumber(partNum++);
                                uploadRequest.setOffset(offset);
                                UploadPartResult uploadPartResult = OBSCommonPipedOutputStream.this.fileSystem.getOBSClient().uploadPart(uploadRequest);
                                PartEtag partEtag = new PartEtag(uploadPartResult.getEtag(), uploadPartResult.getPartNumber());
                                partETags.add(partEtag);
                                offset += (long)totalRead;
                                totalRead = 0;
                                baos.reset();
                            }
                        }

                        obsis = new OBSCommonWindowedSubstream(baos.toByteArray());
                        uploadRequest = new UploadPartRequest(OBSCommonPipedOutputStream.this.bucketId, OBSCommonPipedOutputStream.this.key, (long)totalRead, obsis);
                        uploadRequest.setUploadId(initResponse.getUploadId());
                        uploadRequest.setPartNumber(partNum++);
                        uploadRequest.setOffset(offset);
                        UploadPartResult uploadPartResult = OBSCommonPipedOutputStream.this.fileSystem.getOBSClient().uploadPart(uploadRequest);
                        PartEtag partEtag = new PartEtag(uploadPartResult.getEtag(), uploadPartResult.getPartNumber());
                        partETags.add(partEtag);
                        CompleteMultipartUploadRequest compRequest = new CompleteMultipartUploadRequest(OBSCommonPipedOutputStream.this.bucketId,
                                OBSCommonPipedOutputStream.this.key, initResponse.getUploadId(), partETags);
                        OBSCommonPipedOutputStream.this.fileSystem.getOBSClient().completeMultipartUpload(compRequest);
                    } catch (Throwable var43) {
                        var10 = var43;
                        throw var43;
                    } finally {
                        if (bis != null) {
                            if (var10 != null) {
                                try {
                                    bis.close();
                                } catch (Throwable var42) {
                                    var10.addSuppressed(var42);
                                }
                            } else {
                                bis.close();
                            }
                        }
                    }
                } catch (Throwable var45) {
                    var8 = var45;
                    throw var45;
                } finally {
                    if (baos != null) {
                        if (var8 != null) {
                            try {
                                baos.close();
                            } catch (Throwable var41) {
                                var8.addSuppressed(var41);
                            }
                        } else {
                            baos.close();
                        }
                    }
                }
            } catch (Exception var47) {
                var47.printStackTrace();
                if (initResponse == null) {
                    OBSCommonPipedOutputStream.this.close();
                } else {
                    OBSCommonPipedOutputStream.this.fileSystem.getOBSClient().abortMultipartUpload(new AbortMultipartUploadRequest(OBSCommonPipedOutputStream.this.bucketId, OBSCommonPipedOutputStream.this.key, initResponse.getUploadId()));
                }

                returnVal = false;
            }
            return returnVal;
        }
    }
}
